跳至主要内容
版本: 5.0

RocketMQ Connect 快速入门

快速入门

本教程将以独立模式启动 RocketMQ Connector 示例项目“rocketmq-connect-sample”,帮助您理解连接器的运行原理。示例项目提供了一个源连接器,它从源文件读取数据并将其发送到 RocketMQ 集群。它还提供了一个接收连接器,它从 RocketMQ 集群读取消息并将它们写入目标文件。

1. 准备:启动 RocketMQ

  1. Linux/Unix/Mac
  2. 64 位 JDK 1.8+
  3. Maven 3.2.x+
  4. 启动 RocketMQ。可以使用 RocketMQ 4.xRocketMQ 5.x 5.x 版本。
  5. 使用工具测试 RocketMQ 消息发送和接收。

这里使用环境变量 NAMESRV_ADDR 通知工具客户端 RocketMQ 的 NameServer 地址,例如 localhost:9876。

#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...

$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...

注意:RocketMQ 具有自动创建主题和组的功能。在发送或订阅消息时,如果相应的主题或组不存在,RocketMQ 将自动创建它们。因此,无需提前创建主题和组。

2. 构建连接器运行时

git clone https://github.com/apache/rocketmq-connect.git

cd rocketmq-connect

export RMQ_CONNECT_HOME=`pwd`

mvn -Prelease-connect -Dmaven.test.skip=true clean install -U

注意:该项目默认包含 rocketmq-connect-sample 的代码,因此无需单独构建 rocketmq-connect-sample 插件。

3. 以独立模式运行连接器工作器

修改配置

修改 connect-standalone.conf 文件以配置 RocketMQ 连接地址和其他信息。有关详细信息,请参阅 9. 配置文件说明

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT

vim conf/connect-standalone.conf

在独立模式下,RocketMQ Connect 将同步检查点信息持久化到本地文件目录 storePathRootDir。

storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

如果要重置同步检查点,则需要删除持久化的检查点文件。

rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*

以独立模式启动连接器工作器

sh bin/connect-standalone.sh -c conf/connect-standalone.conf &

提示:可以修改 docker/connect/bin/runconnect.sh 以根据需要调整 JVM 启动参数。

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"

查看启动日志文件

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

如果运行时启动成功,您将在日志文件中看到以下打印内容

The standalone worker boot success.

要退出 tail -f 命令的日志跟踪模式,可以按 Ctrl + C 键组合。

4. 启动源连接器

创建源文件并写入测试数据

mkdir -p /Users/YourUsername/rocketmqconnect/
cd /Users/YourUsername/rocketmqconnect/
touch test-source-file.txt

echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt

注意:源文件中不应该有空行(如果遇到空行,演示程序将抛出错误)。源连接器将持续读取源文件,并将每行数据转换为消息体,发送到 RocketMQ 供接收连接器消费。

启动源连接器

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt",
"connect.topicname": "fileTopic"
}'

如果 curl 请求返回状态 200,则表示创建成功。示例响应

{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}}

查看日志文件

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

如果您看到以下日志,则表示文件源连接器已成功启动

Start connector fileSourceConnector and set target state STARTED successed!!

源连接器配置说明

可空默认值描述
connector.classfalse实现 Connector 接口的类名(包括包名)
filenamefalse源文件名(建议使用绝对路径)
connect.topicnamefalse同步文件数据所需的主题

5. 启动接收连接器

curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt",
"connect.topicnames": "fileTopic"
}'

如果 curl 请求返回状态 200,则表示创建成功。示例响应

{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}}

查看日志文件

tail -100f ~/logs/rocketmqconnect/connect_runtime.log

如果您看到以下日志,则表示文件接收连接器已成功启动

Start connector fileSinkConnector and set target state STARTED successed!!

检查接收连接器是否已将数据写入目标文件

cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt

如果生成了 test-sink-file.txt 文件,并且其内容与 test-source-file.txt 相同,则表示整个过程运行正常。

继续将测试数据写入源文件 test-source-file.txt

cd /Users/YourUsername/rocketmqconnect/

echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt

# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed
sleep 10
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt

注意:文件内容的顺序可能会有所不同,因为 rocketmq-connect-sample 在向 RocketMQ 主题发送和接收消息时使用的是 普通消息。这与 有序消息 不同,消费 普通消息 并不保证顺序。

接收连接器配置说明

可空默认值描述
connector.classfalse实现 Connector 接口的类名(包括包名)
filenamefalse接收连接器拉取数据并将其保存到文件(建议使用绝对路径)
connect.topicnamesfalse接收连接器需要处理的数据消息的主题

提示:示例 rocketmq-connect-sample 的配置文件说明仅供参考,不同的源/接收连接器具有不同的配置,请参考具体的源/接收连接器。

6. 停止连接器

停止连接器的 RESTful 命令格式为 http://(您的工作器 IP):(端口)/connectors/(连接器名称)/stop

要停止演示中的两个连接器,可以使用以下命令

curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop

如果 curl 请求返回状态 200,则表示成功停止连接器。示例响应

{"status":200,"body":"Connector[fileSinkConnector]deleted successfully"}

如果您看到以下日志消息,则表示文件接收连接器已成功关闭

tail -100f ~/logs/rocketmqconnect/connect_default.log

Completed shutdown for connectorName:fileSinkConnector

7. 停止工作器进程

cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connectshutdown.sh

8. 日志目录

可以使用以下命令查看日志目录

ls $HOME/logs/rocketmqconnect
ls ~/logs/rocketmqconnect

9. 配置文件说明

根据您的使用情况修改 RESTful 端口、storeRoot 路径、NameServer 地址和其他信息。

以下是一个配置文件示例

#current cluster node uniquely identifies
workerId=DEFAULT_WORKER_1

# Http prot for user to access REST API
httpPort=8082

# Local file dir for config store
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot

#You need to modify it to your own rocketmq nameserver endpoint.
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876

# Plugin path for loading Source/Sink Connectors
# The rocketmq-connect project already includes the rocketmq-connect-sample module by default, so no configuration is needed here.
pluginPaths=

storePathRootDir 配置说明

在独立模式下,RocketMQ Connect 将同步检查点信息持久化到 storePathRootDir 指定的本地文件目录。持久化文件包括

描述
connectorConfig.json连接器配置持久化文件
position.json源连接器数据处理进度持久化文件
taskConfig.json任务配置持久化文件
offset.json接收连接器数据消费进度持久化文件
connectorStatus.json连接器状态持久化文件
taskStatus.json任务状态持久化文件